// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.analysis;

import org.apache.doris.catalog.ArrayType;
import org.apache.doris.catalog.HdfsResource;
import org.apache.doris.catalog.MapType;
import org.apache.doris.catalog.PrimitiveType;
import org.apache.doris.catalog.ScalarType;
import org.apache.doris.catalog.StructField;
import org.apache.doris.catalog.StructType;
import org.apache.doris.catalog.Type;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.Config;
import org.apache.doris.common.FeNameFormat;
import org.apache.doris.common.Pair;
import org.apache.doris.common.UserException;
import org.apache.doris.common.util.ParseUtil;
import org.apache.doris.common.util.PrintableMap;
import org.apache.doris.datasource.property.fileformat.CsvFileFormatProperties;
import org.apache.doris.datasource.property.fileformat.FileFormatProperties;
import org.apache.doris.datasource.property.storage.HdfsProperties;
import org.apache.doris.datasource.property.storage.HdfsPropertiesUtils;
import org.apache.doris.thrift.TFileFormatType;
import org.apache.doris.thrift.TParquetDataType;
import org.apache.doris.thrift.TParquetRepetitionType;
import org.apache.doris.thrift.TParquetSchema;
import org.apache.doris.thrift.TResultFileSinkOptions;

import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Set;

// For syntax select * from tbl INTO OUTFILE xxxx
public class OutFileClause {
    private static final Logger LOG = LogManager.getLogger(OutFileClause.class);

    public static final List<String> RESULT_COL_NAMES = Lists.newArrayList();
    public static final List<Type> RESULT_COL_TYPES = Lists.newArrayList();
    public static final Map<String, TParquetRepetitionType> PARQUET_REPETITION_TYPE_MAP = Maps.newHashMap();
    public static final Map<String, TParquetDataType> PARQUET_DATA_TYPE_MAP = Maps.newHashMap();
    public static final Set<String> ORC_DATA_TYPE = Sets.newHashSet();
    public static final String FILE_NUMBER = "FileNumber";
    public static final String TOTAL_ROWS = "TotalRows";
    public static final String FILE_SIZE = "FileSize";
    public static final String URL = "URL";
    public static final String WRITE_TIME_SEC = "WriteTimeSec";
    public static final String WRITE_SPEED_KB = "WriteSpeedKB";

    static {
        RESULT_COL_NAMES.add(FILE_NUMBER);
        RESULT_COL_NAMES.add(TOTAL_ROWS);
        RESULT_COL_NAMES.add(FILE_SIZE);
        RESULT_COL_NAMES.add(URL);
        RESULT_COL_NAMES.add(WRITE_TIME_SEC);
        RESULT_COL_NAMES.add(WRITE_SPEED_KB);

        RESULT_COL_TYPES.add(ScalarType.createType(PrimitiveType.INT));
        RESULT_COL_TYPES.add(ScalarType.createType(PrimitiveType.BIGINT));
        RESULT_COL_TYPES.add(ScalarType.createType(PrimitiveType.BIGINT));
        RESULT_COL_TYPES.add(ScalarType.createType(PrimitiveType.VARCHAR));
        RESULT_COL_TYPES.add(ScalarType.createType(PrimitiveType.VARCHAR));
        RESULT_COL_TYPES.add(ScalarType.createType(PrimitiveType.VARCHAR));

        PARQUET_REPETITION_TYPE_MAP.put("required", TParquetRepetitionType.REQUIRED);
        PARQUET_REPETITION_TYPE_MAP.put("repeated", TParquetRepetitionType.REPEATED);
        PARQUET_REPETITION_TYPE_MAP.put("optional", TParquetRepetitionType.OPTIONAL);

        PARQUET_DATA_TYPE_MAP.put("boolean", TParquetDataType.BOOLEAN);
        PARQUET_DATA_TYPE_MAP.put("int32", TParquetDataType.INT32);
        PARQUET_DATA_TYPE_MAP.put("int64", TParquetDataType.INT64);
        PARQUET_DATA_TYPE_MAP.put("int96", TParquetDataType.INT96);
        PARQUET_DATA_TYPE_MAP.put("byte_array", TParquetDataType.BYTE_ARRAY);
        PARQUET_DATA_TYPE_MAP.put("float", TParquetDataType.FLOAT);
        PARQUET_DATA_TYPE_MAP.put("double", TParquetDataType.DOUBLE);
        PARQUET_DATA_TYPE_MAP.put("fixed_len_byte_array", TParquetDataType.FIXED_LEN_BYTE_ARRAY);

        ORC_DATA_TYPE.add("bigint");
        ORC_DATA_TYPE.add("boolean");
        ORC_DATA_TYPE.add("double");
        ORC_DATA_TYPE.add("float");
        ORC_DATA_TYPE.add("int");
        ORC_DATA_TYPE.add("smallint");
        ORC_DATA_TYPE.add("string");
        ORC_DATA_TYPE.add("tinyint");
    }

    public static final String LOCAL_FILE_PREFIX = "file:///";
    private static final String HDFS_FILE_PREFIX = "hdfs://";
    private static final String BROKER_PROP_PREFIX = "broker.";
    public static final String PROP_BROKER_NAME = "broker.name";
    public static final String PROP_COLUMN_SEPARATOR = "column_separator";
    public static final String PROP_LINE_DELIMITER = "line_delimiter";
    public static final String PROP_MAX_FILE_SIZE = "max_file_size";
    private static final String PROP_SUCCESS_FILE_NAME = "success_file_name";
    public static final String PROP_DELETE_EXISTING_FILES = "delete_existing_files";
    public static final String PROP_FILE_SUFFIX = "file_suffix";
    public static final String PROP_WITH_BOM = "with_bom";

    private static final String SCHEMA = "schema";

    private static final long DEFAULT_MAX_FILE_SIZE_BYTES = 1 * 1024 * 1024 * 1024; // 1GB
    private static final long MIN_FILE_SIZE_BYTES = 5 * 1024 * 1024L; // 5MB

    private String filePath;
    private Map<String, String> properties;

    private long maxFileSizeBytes = DEFAULT_MAX_FILE_SIZE_BYTES;
    private boolean deleteExistingFiles = false;
    private String fileSuffix = "";
    private boolean withBom = false;
    private BrokerDesc brokerDesc = null;
    // True if result is written to local disk.
    // If set to true, the brokerDesc must be null.
    private boolean isLocalOutput = false;
    private String successFileName = "";

    private List<TParquetSchema> parquetSchemas = new ArrayList<>();

    private List<Pair<String, String>> orcSchemas = new ArrayList<>();

    private boolean isAnalyzed = false;

    private FileFormatProperties fileFormatProperties;

    public OutFileClause(String filePath, String format, Map<String, String> properties) {
        this.filePath = filePath;
        this.properties = properties;
        this.isAnalyzed = false;
        if (Strings.isNullOrEmpty(format)) {
            fileFormatProperties = FileFormatProperties.createFileFormatProperties("csv");
        } else {
            fileFormatProperties = FileFormatProperties.createFileFormatProperties(format.toLowerCase());
        }
    }

    public OutFileClause(OutFileClause other) {
        this.filePath = other.filePath;
        this.fileFormatProperties = other.fileFormatProperties;
        this.properties = other.properties == null ? null : Maps.newHashMap(other.properties);
        this.isAnalyzed = other.isAnalyzed;
    }

    public String getColumnSeparator() {
        return ((CsvFileFormatProperties) fileFormatProperties).getColumnSeparator();
    }

    public String getLineDelimiter() {
        return ((CsvFileFormatProperties) fileFormatProperties).getLineDelimiter();
    }

    public String getHeaderType() {
        return ((CsvFileFormatProperties) fileFormatProperties).getHeaderType();
    }

    public TFileFormatType getFileFormatType() {
        return fileFormatProperties.getFileFormatType();
    }

    public long getMaxFileSizeBytes() {
        return maxFileSizeBytes;
    }

    public BrokerDesc getBrokerDesc() {
        return brokerDesc;
    }

    public List<TParquetSchema> getParquetSchemas() {
        return parquetSchemas;
    }

    public void analyze(List<Expr> resultExprs, List<String> colLabels, boolean needFormat) throws UserException {
        if (isAnalyzed) {
            // If the query stmt is rewritten, the whole stmt will be analyzed again.
            // But some of fields in this OutfileClause has been changed,
            // such as `filePath`'s schema header has been removed.
            // So OutfileClause does not support to be analyzed again.
            return;
        }
        analyzeFilePath();

        analyzeProperties();

        if (brokerDesc != null && isLocalOutput) {
            throw new AnalysisException("No need to specify BROKER properties in OUTFILE clause for local file output");
        } else if (brokerDesc == null && !isLocalOutput) {
            throw new AnalysisException("Please specify BROKER properties or check your local file path.");
        }
        isAnalyzed = true;

        // This analyze() method will be called twice:
        // one is normal query plan analyze,
        // the other is when writing success file after outfile is done on FE side.
        // In the second time, we do not need to analyze format related things again.
        if (needFormat) {
            if (isParquetFormat()) {
                analyzeForParquetFormat(resultExprs, colLabels);
            } else if (isOrcFormat()) {
                analyzeForOrcFormat(resultExprs, colLabels);
            }
        }
    }

    private void genOrcSchema(List<Expr> resultExprs, List<String> colLabels) throws AnalysisException {
        Preconditions.checkState(this.orcSchemas.isEmpty());
        for (int i = 0; i < resultExprs.size(); ++i) {
            Expr expr = resultExprs.get(i);
            String type = dorisTypeToOrcTypeMap(expr.getType());
            orcSchemas.add(Pair.of(colLabels.get(i), type));
        }
    }

    private String dorisTypeToOrcTypeMap(Type dorisType) throws AnalysisException {
        String orcType = "";
        switch (dorisType.getPrimitiveType()) {
            case NULL_TYPE:
                orcType = "tinyint";
                break;
            case BOOLEAN:
            case TINYINT:
            case SMALLINT:
            case INT:
            case BIGINT:
            case FLOAT:
            case DOUBLE:
            case STRING:
                orcType = dorisType.getPrimitiveType().toString().toLowerCase();
                break;
            case HLL:
            case BITMAP:
            case QUANTILE_STATE:
                orcType = "binary";
                break;
            case DATEV2:
                orcType = "date";
                break;
            case DATETIMEV2:
                orcType = "timestamp";
                break;
            case CHAR:
                orcType = "char(" + dorisType.getLength() + ")";
                break;
            case VARCHAR:
                orcType = "varchar(" + dorisType.getLength() + ")";
                break;
            case IPV4:
                orcType = "int";
                break;
            case LARGEINT:
            case DATE:
            case DATETIME:
            case IPV6:
            case VARIANT:
            case JSONB:
                orcType = "string";
                break;
            case DECIMALV2:
                if (!dorisType.isWildcardDecimal()) {
                    orcType = String.format("decimal(%d, 9)", ScalarType.MAX_DECIMAL128_PRECISION);
                } else {
                    throw new AnalysisException("currently ORC writer do not support WildcardDecimal!");
                }
                break;
            case DECIMAL32:
            case DECIMAL64:
            case DECIMAL128:
                if (!dorisType.isWildcardDecimal()) {
                    orcType = String.format("decimal(%d, %d)", ((ScalarType) dorisType).getPrecision(),
                            ((ScalarType) dorisType).decimalScale());
                } else {
                    throw new AnalysisException("currently ORC outfile do not support WildcardDecimal!");
                }
                break;
            case STRUCT: {
                StructType structType = (StructType) dorisType;

                StringBuilder sb = new StringBuilder();
                sb.append("struct<");
                for (int i = 0; i < structType.getFields().size(); ++i) {
                    if (i != 0) {
                        sb.append(",");
                    }
                    StructField field = structType.getFields().get(i);
                    sb.append(field.getName())
                            .append(":")
                            .append(dorisTypeToOrcTypeMap(field.getType()));
                }
                sb.append(">");
                orcType = sb.toString();
                break;
            }
            case MAP: {
                MapType mapType = (MapType) dorisType;
                StringBuilder sb = new StringBuilder();
                sb.append("map<")
                        .append(dorisTypeToOrcTypeMap(mapType.getKeyType()))
                        .append(",")
                        .append(dorisTypeToOrcTypeMap(mapType.getValueType()));
                sb.append(">");
                orcType = sb.toString();
                break;
            }
            case ARRAY: {
                StringBuilder sb = new StringBuilder();
                ArrayType arrayType = (ArrayType) dorisType;
                sb.append("array<")
                        .append(dorisTypeToOrcTypeMap(arrayType.getItemType()))
                        .append(">");
                orcType = sb.toString();
                break;
            }
            default:
                throw new AnalysisException("currently orc do not support column type: "
                        + dorisType.getPrimitiveType());
        }
        return orcType;
    }

    private String serializeOrcSchema() {
        StringBuilder sb = new StringBuilder();
        sb.append("struct<");
        this.orcSchemas.forEach(pair -> sb.append(pair.first + ":" + pair.second + ","));
        if (!this.orcSchemas.isEmpty()) {
            return sb.substring(0, sb.length() - 1) + ">";
        } else {
            return sb.toString() + ">";
        }
    }

    private void analyzeForOrcFormat(List<Expr> resultExprs, List<String> colLabels) throws AnalysisException {
        if (this.orcSchemas.isEmpty()) {
            genOrcSchema(resultExprs, colLabels);
        }
        // check schema number
        if (resultExprs.size() != this.orcSchemas.size()) {
            throw new AnalysisException("Orc schema number does not equal to select item number");
        }
        // check type
        for (int i = 0; i < this.orcSchemas.size(); ++i) {
            Pair<String, String> schema = this.orcSchemas.get(i);
            Type resultType = resultExprs.get(i).getType();
            switch (resultType.getPrimitiveType()) {
                case NULL_TYPE:
                    checkOrcType(schema.second, "tinyint", true, resultType.getPrimitiveType().toString());
                    break;
                case BOOLEAN:
                case TINYINT:
                case SMALLINT:
                case INT:
                case BIGINT:
                case FLOAT:
                case DOUBLE:
                case STRING:
                    checkOrcType(schema.second, resultType.getPrimitiveType().toString().toLowerCase(), true,
                            resultType.getPrimitiveType().toString());
                    break;
                case DATEV2:
                    checkOrcType(schema.second, "date", true, resultType.getPrimitiveType().toString());
                    break;
                case DATETIMEV2:
                    checkOrcType(schema.second, "timestamp", true, resultType.getPrimitiveType().toString());
                    break;
                case CHAR:
                    checkOrcType(schema.second, "char", false, resultType.getPrimitiveType().toString());
                    break;
                case VARCHAR:
                    checkOrcType(schema.second, "varchar", false, resultType.getPrimitiveType().toString());
                    break;
                case IPV4:
                    checkOrcType(schema.second, "int", false, resultType.getPrimitiveType().toString());
                    break;
                case LARGEINT:
                case DATE:
                case DATETIME:
                case IPV6:
                case VARIANT:
                case JSONB:
                    checkOrcType(schema.second, "string", true, resultType.getPrimitiveType().toString());
                    break;
                case DECIMAL32:
                case DECIMAL64:
                case DECIMAL128:
                case DECIMALV2:
                    checkOrcType(schema.second, "decimal", false, resultType.getPrimitiveType().toString());
                    break;
                case HLL:
                case BITMAP:
                case QUANTILE_STATE:
                    checkOrcType(schema.second, "binary", true, resultType.getPrimitiveType().toString());
                    break;
                case STRUCT:
                    checkOrcType(schema.second, "struct", false, resultType.getPrimitiveType().toString());
                    break;
                case MAP:
                    checkOrcType(schema.second, "map", false, resultType.getPrimitiveType().toString());
                    break;
                case ARRAY:
                    checkOrcType(schema.second, "array", false, resultType.getPrimitiveType().toString());
                    break;
                default:
                    throw new AnalysisException("Orc format does not support column type: "
                            + resultType.getPrimitiveType());
            }
        }
    }

    private void checkOrcType(String orcType, String expectType, boolean isEqual, String dorisType)
            throws AnalysisException {
        if (isEqual) {
            if (orcType.equals(expectType)) {
                return;
            }
        } else {
            if (orcType.startsWith(expectType)) {
                return;
            }
        }
        throw new AnalysisException("project field type is " + dorisType
                + ", should use " + expectType + ", but the definition type is " + orcType);
    }


    private void analyzeForParquetFormat(List<Expr> resultExprs, List<String> colLabels) throws AnalysisException {
        if (this.parquetSchemas.isEmpty()) {
            genParquetColumnName(resultExprs, colLabels);
        }
        // check schema number
        if (resultExprs.size() != this.parquetSchemas.size()) {
            throw new AnalysisException("Parquet schema number does not equal to select item number");
        }
    }

    private void genParquetColumnName(List<Expr> resultExprs, List<String> colLabels) throws AnalysisException {
        for (int i = 0; i < resultExprs.size(); ++i) {
            TParquetSchema parquetSchema = new TParquetSchema();
            parquetSchema.schema_column_name = colLabels.get(i);
            parquetSchemas.add(parquetSchema);
        }
    }

    private void analyzeFilePath() throws AnalysisException {
        if (Strings.isNullOrEmpty(filePath)) {
            throw new AnalysisException("Must specify file in OUTFILE clause");
        }

        if (filePath.startsWith(LOCAL_FILE_PREFIX)) {
            if (!Config.enable_outfile_to_local) {
                throw new AnalysisException("Exporting results to local disk is not allowed."
                        + " To enable this feature, you need to add `enable_outfile_to_local=true`"
                        + " in fe.conf and restart FE");
            }
            isLocalOutput = true;
            filePath = filePath.substring(LOCAL_FILE_PREFIX.length() - 1); // leave last '/'
        } else {
            isLocalOutput = false;
        }
        if (properties != null) {
            String namePrefix = properties.containsKey(PROP_BROKER_NAME)
                    ? BROKER_PROP_PREFIX + HdfsResource.DSF_NAMESERVICES : HdfsResource.DSF_NAMESERVICES;
            String dfsNameServices = properties.getOrDefault(namePrefix, "");
            if (!Strings.isNullOrEmpty(dfsNameServices) && !filePath.contains(dfsNameServices)) {
                filePath = filePath.replace(HDFS_FILE_PREFIX, HDFS_FILE_PREFIX + dfsNameServices);
            }
        }
        // delete repeated '/'
        try {
            filePath = new URI(filePath).normalize().toString();
        } catch (URISyntaxException e) {
            throw new AnalysisException("Can not normalize the URI, error: " + e.getMessage());
        }
        if (Strings.isNullOrEmpty(filePath)) {
            throw new AnalysisException("Must specify file in OUTFILE clause");
        }
    }

    private void analyzeProperties() throws UserException {
        if (properties == null || properties.isEmpty()) {
            return;
        }
        // Copy the properties, because we will remove the key from properties.
        Map<String, String> copiedProps = Maps.newTreeMap(String.CASE_INSENSITIVE_ORDER);
        copiedProps.putAll(properties);

        analyzeBrokerDesc(copiedProps);

        fileFormatProperties.analyzeFileFormatProperties(copiedProps, true);
        // check if compression type for csv is supported
        if (fileFormatProperties instanceof CsvFileFormatProperties) {
            CsvFileFormatProperties csvFileFormatProperties = (CsvFileFormatProperties) fileFormatProperties;
            csvFileFormatProperties.checkSupportedCompressionType(true);
        }

        if (copiedProps.containsKey(PROP_MAX_FILE_SIZE)) {
            maxFileSizeBytes = ParseUtil.analyzeDataVolume(copiedProps.get(PROP_MAX_FILE_SIZE));
            if (maxFileSizeBytes < MIN_FILE_SIZE_BYTES) {
                throw new AnalysisException("max file size should larger than 5MB. Given: " + maxFileSizeBytes);
            }
            copiedProps.remove(PROP_MAX_FILE_SIZE);
        }

        if (copiedProps.containsKey(PROP_DELETE_EXISTING_FILES)) {
            deleteExistingFiles = Boolean.parseBoolean(copiedProps.get(PROP_DELETE_EXISTING_FILES));
            if (deleteExistingFiles && !Config.enable_delete_existing_files) {
                throw new AnalysisException("Deleting existing files is not allowed."
                        + " To enable this feature, you need to add `enable_delete_existing_files=true`"
                        + " in fe.conf");
            }
            copiedProps.remove(PROP_DELETE_EXISTING_FILES);
        }

        if (copiedProps.containsKey(PROP_FILE_SUFFIX)) {
            fileSuffix = copiedProps.get(PROP_FILE_SUFFIX);
            copiedProps.remove(PROP_FILE_SUFFIX);
        }

        if (copiedProps.containsKey(PROP_WITH_BOM)) {
            withBom = Boolean.valueOf(copiedProps.get(PROP_WITH_BOM)).booleanValue();
            copiedProps.remove(PROP_WITH_BOM);
        }

        if (copiedProps.containsKey(PROP_SUCCESS_FILE_NAME)) {
            successFileName = copiedProps.get(PROP_SUCCESS_FILE_NAME);
            FeNameFormat.checkOutfileSuccessFileName("file name", successFileName);
            copiedProps.remove(PROP_SUCCESS_FILE_NAME);
        }

        if (fileFormatProperties.getFileFormatType() == TFileFormatType.FORMAT_PARQUET) {
            getParquetProperties(copiedProps);
        }

        if (fileFormatProperties.getFileFormatType() == TFileFormatType.FORMAT_ORC) {
            getOrcProperties(copiedProps);
        }
    }

    /**
     * The following two situations will generate the corresponding @brokerDesc:
     * 1. broker: with broker name
     * 2. s3: with s3 pattern path, without broker name
     */
    private void analyzeBrokerDesc(Map<String, String> copiedProps) throws UserException {
        /**
         * If the output is intended to be written to the local file system, skip BrokerDesc analysis.
         * This is because Broker properties are not required when writing files locally,
         * and the upper layer logic ensures that brokerDesc must be null in this case.
         */
        if (isLocalOutput) {
            return;
        }
        String brokerName = copiedProps.get(PROP_BROKER_NAME);
        brokerDesc = new BrokerDesc(brokerName, copiedProps);
        /*
         * Note on HDFS export behavior and URI handling:
         *
         * 1. Currently, URI extraction from user input supports case-insensitive key matching
         *    (e.g., "URI", "Uri", "uRI", etc.), to tolerate non-standard input from users.
         *
         * 2. In OUTFILE scenarios, if FE  fails to pass 'fs.defaultFS' to the BE ,
         *    it may lead to data export failure *without* triggering an actual error (appears as success),
         *    which is misleading and can cause silent data loss or inconsistencies.
         *
         * 3. As a temporary safeguard, the following logic forcibly extracts the default FS from the provided
         *    file path and injects it into the broker descriptor config:
         *
         *      if (brokerDesc.getStorageType() == HDFS) {
         *          extract default FS from file path
         *          and put into BE config as 'fs.defaultFS'
         *      }
         *
         * 4. Long-term solution: We should define and enforce a consistent parameter specification
         *    across all user-facing entry points (including FE input validation, broker desc normalization, etc.),
         *    to prevent missing critical configs like 'fs.defaultFS'.
         *
         * 5. Suggested improvements:
         *    - Normalize all parameter keys (e.g., to lowercase)
         *    - Centralize HDFS URI parsing logic
         *    - Add validation in FE to reject incomplete or malformed configs
         */
        if (null != brokerDesc.getStorageType() && brokerDesc.getStorageType()
                .equals(StorageBackend.StorageType.HDFS)) {
            String defaultFs = HdfsPropertiesUtils.extractDefaultFsFromPath(filePath);
            brokerDesc.getBackendConfigProperties().put(HdfsProperties.HDFS_DEFAULT_FS_NAME, defaultFs);
        }
    }

    /**
     * example:
     * SELECT citycode FROM table1 INTO OUTFILE "file:///root/doris/"
     * FORMAT AS PARQUET PROPERTIES ("schema"="required,int32,siteid;", "parquet.compression"="snappy");
     * <p>
     * schema: it defined the schema of parquet file, it consists of 3 field: competition type, data type, column name
     * multiple columns is split by `;`
     * <p>
     * prefix with 'parquet.' defines the properties of parquet file,
     * currently only supports: compression, disable_dictionary, version
     */
    private void getParquetProperties(Map<String, String> copiedProps) throws AnalysisException {
        // check schema. if schema is not set, Doris will gen schema by select items
        // Note: These codes are useless and outdated.
        String schema = copiedProps.get(SCHEMA);
        if (schema == null) {
            return;
        }
        if (schema.isEmpty()) {
            throw new AnalysisException("Parquet schema property should not be empty");
        }
        schema = schema.replace(" ", "");
        schema = schema.toLowerCase();
        String[] schemas = schema.split(";");
        for (String item : schemas) {
            String[] fields = item.split(",");
            if (fields.length != 3) {
                throw new AnalysisException("must only contains repetition type/column type/column name");
            }
            if (!PARQUET_REPETITION_TYPE_MAP.containsKey(fields[0])) {
                throw new AnalysisException("unknown repetition type");
            }
            if (!fields[0].equalsIgnoreCase("required")) {
                throw new AnalysisException("currently only support required type");
            }
            if (!PARQUET_DATA_TYPE_MAP.containsKey(fields[1])) {
                throw new AnalysisException("data type is not supported:" + fields[1]);
            }
            TParquetSchema parquetSchema = new TParquetSchema();
            parquetSchema.schema_repetition_type = PARQUET_REPETITION_TYPE_MAP.get(fields[0]);
            parquetSchema.schema_data_type = PARQUET_DATA_TYPE_MAP.get(fields[1]);
            parquetSchema.schema_column_name = fields[2];
            parquetSchemas.add(parquetSchema);
        }
        copiedProps.remove(SCHEMA);
    }

    private void getOrcProperties(Map<String, String> copiedProps) throws AnalysisException {
        // check schema. if schema is not set, Doris will gen schema by select items
        String schema = copiedProps.get(SCHEMA);
        if (schema == null) {
            return;
        }
        if (schema.isEmpty()) {
            throw new AnalysisException("Orc schema property should not be empty");
        }
        schema = schema.replace(" ", "");
        schema = schema.toLowerCase();
        String[] schemas = schema.split(";");
        for (String item : schemas) {
            String[] fields = item.split(",");
            if (fields.length != 2) {
                throw new AnalysisException("must only contains type and column name");
            }
            if (!ORC_DATA_TYPE.contains(fields[1]) && !fields[1].startsWith("decimal")) {
                throw new AnalysisException("data type is not supported:" + fields[1]);
            } else if (!ORC_DATA_TYPE.contains(fields[1]) && fields[1].startsWith("decimal")) {
                String errorMsg = "Format of decimal type must be decimal(%d,%d)";
                String precisionAndScale = fields[1].substring(0, "decimal".length()).trim();
                if (!precisionAndScale.startsWith("(") || !precisionAndScale.endsWith(")")) {
                    throw new AnalysisException(errorMsg);
                }
                String[] str = precisionAndScale.substring(1, precisionAndScale.length() - 1).split(",");
                if (str.length != 2) {
                    throw new AnalysisException(errorMsg);
                }
            }
            orcSchemas.add(Pair.of(fields[0], fields[1]));
        }
        copiedProps.remove(SCHEMA);
    }

    private boolean isParquetFormat() {
        return fileFormatProperties.getFileFormatType() == TFileFormatType.FORMAT_PARQUET;
    }

    private boolean isOrcFormat() {
        return fileFormatProperties.getFileFormatType() == TFileFormatType.FORMAT_ORC;
    }

    public String getFilePath() {
        return filePath;
    }

    public String getSuccessFileName() {
        return successFileName;
    }

    @Override
    public OutFileClause clone() {
        return new OutFileClause(this);
    }

    public String toSql() {
        StringBuilder sb = new StringBuilder();
        sb.append(" INTO OUTFILE '").append(filePath).append(" FORMAT AS ")
                .append(fileFormatProperties.getFormatName());
        if (properties != null && !properties.isEmpty()) {
            sb.append(" PROPERTIES(");
            sb.append(new PrintableMap<>(properties, " = ", true, false));
            sb.append(")");
        }
        return sb.toString();
    }

    public String toDigest() {
        StringBuilder sb = new StringBuilder();
        sb.append(" INTO OUTFILE '").append(" ? ").append(" FORMAT AS ").append(" ? ");
        if (properties != null && !properties.isEmpty()) {
            sb.append(" PROPERTIES(").append(" ? ").append(")");
        }
        return sb.toString();
    }

    public TResultFileSinkOptions toSinkOptions() {
        TResultFileSinkOptions sinkOptions = new TResultFileSinkOptions(filePath,
                fileFormatProperties.getFileFormatType());
        fileFormatProperties.fullTResultFileSinkOptions(sinkOptions);

        sinkOptions.setMaxFileSizeBytes(maxFileSizeBytes);
        sinkOptions.setDeleteExistingFiles(deleteExistingFiles);
        sinkOptions.setFileSuffix(fileSuffix);
        sinkOptions.setWithBom(withBom);

        if (brokerDesc != null) {
            sinkOptions.setBrokerProperties(brokerDesc.getBackendConfigProperties());
            // broker_addresses of sinkOptions will be set in Coordinator.
            // Because we need to choose the nearest broker with the result sink node.
        }
        if (!Strings.isNullOrEmpty(successFileName)) {
            sinkOptions.setSuccessFileName(successFileName);
        }
        if (isParquetFormat()) {
            sinkOptions.setParquetSchemas(parquetSchemas);
        }
        if (isOrcFormat()) {
            sinkOptions.setOrcSchema(serializeOrcSchema());
        }
        return sinkOptions;
    }
}


